在智能测试日志分析方案中,使用kafka作为消息集成的关键组件,简单记录一下使用的命令和docker搭建笔记。
搭建部署 由于本人是半个docker狂魔,docker搭建在保持环境干净,启动停止应用上有突出的优势,遂使用docker形式作为先行方案。根据ibm上的论文 表示,cpu和内存损失较少,在网络IO上损耗多一点。在生产应用发现瓶颈后再去考虑native部署。
zookeeper
mkdir -p /home/tony/zookeeper/data /home/tony/zookeeper/datalog
1 2 3 4 5 docker run -d --restart always --name zookeeper \ -p 2181:2181 \ -v /home/tony/zookeeper/data:/data \ -v /home/tony/zookeeper/datalog:/datalog \ zookeeper:3.4
kafka hub.docker.com上有好多镜像,最高的要自己build,在proxy环境下比较麻烦,构建过程中在alpine中用wget下载https报错。有些镜像是捆绑zookeeper,最后选择的是分开的,提供了data和logs映射的镜像。 远程连接kafka机器的broker需要注意,config/server.properties
中advertised.host.name
要配置为宿主机被访问的外网ip。如果是新版(0.10)以后,可以配置advertised.listeners=PLAINTEXT://59.64.11.22:9092
chown -R 1000:1000 data/ logs/
1 2 3 4 5 6 7 8 9 docker run -d --restart always --name kafka \ -p 7203:7203 \ -p 9092:9092 \ -v /home/tony/kafka/data:/data \ -v /home/tony/kafka/logs:/logs \ --link zookeeper:zkhost \ -e ZOOKEEPER_IP=zkhost \ -e KAFKA_ADVERTISED_HOST_NAME=10.75.76.163 \ ches/kafka
控制台命令 可以进入到容器内执行,也可以在本地用官网最近的二进制文件。
topic 1 2 3 4 5 6 bin/kafka-topics.sh --list --zookeeper localhost:2181 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
producer
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
consumer
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
client python 根据官网和infoQ的文章表示,在高性能要求的环境下,封装C库Librdkafka
的confluent-kafka有较高性能。但是纯python实现的kafka-python功能一样全面,pip直接完事。另外还有更加”python-like”的pykafka
,本身是纯python实现,也可以切换为libdkafka的实现。
kafka-python producer 1 2 3 4 5 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='10.75.76.163:9092' , linger_ms=0 ) for _ in range (20 ): producer.send('test' , b"from_windows4" ) producer.close()
consumer 1 2 3 4 5 from kafka import KafkaConsumer consumer = KafkaConsumer("test" , bootstrap_servers=['10.75.76.163:9092' ], api_version=(0 , 10 )) for msg in consumer: print (msg) consumer.close()
pykafka https://github.com/Parsely/pykafka 以下示例仅用于helloworld测试,实际可能需要while(True)去生产或消费、异步地
producer 1 2 3 4 5 6 7 8 9 10 11 12 13 from pykafka import KafkaClient client = KafkaClient(hosts="10.75.76.163:9092" ) print (client.topics) topic = client.topics['test' ] with topic.get_sync_producer() as producer: for i in range (4 ): producer.produce('pykafka produce test sync_message ' + str (i ** 2 )) with topic.get_producer() as producer: for i in range (5 ): producer.produce('pykafka produce test async_message ' + str (i ** 2 ))
consumer 1 2 3 4 5 6 7 8 9 from pykafka import KafkaClientclient = KafkaClient(hosts="10.75.76.163:9092" ) print (client.topics)topic = client.topics['test' ] consumer = topic.get_simple_consumer() for message in consumer: if message is not None : print message.offset, message.value
confluent-kafka windows搞这个库比较麻烦,先往后放一放,遇到性能瓶颈再优化。
Java 基本照抄官网例子,java的客户端文档不在client章节,在api章节(https://kafka.apache.org/documentation/#producerapi )
依赖pom文件 1 2 3 4 5 6 7 <dependencies > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 0.10.2.1</version > </dependency > </dependencies >
producer https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerDemo { public static void main (String[] args) { Properties properties = new Properties (); properties.put("bootstrap.servers" , "10.75.76.163:9092" ); properties.put("acks" , "all" ); properties.put("retries" , 0 ); properties.put("batch.size" , 16384 ); properties.put("linger.ms" , 1 ); properties.put("buffer.memory" , 33554432 ); properties.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); properties.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); Producer<String, String> producer = null ; try { producer = new KafkaProducer <String, String>(properties); for (int i = 0 ; i < 10 ; i++) { String msg = "Message " + i; producer.send(new ProducerRecord <String, String>("test" , msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
consumer https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class ConsumerDemo { public static void main (String[] args) { Properties properties = new Properties (); properties.put("bootstrap.servers" , "10.75.76.163:9092" ); properties.put("group.id" , "group-1" ); properties.put("enable.auto.commit" , "true" ); properties.put("auto.commit.interval.ms" , "1000" ); properties.put("auto.offset.reset" , "earliest" ); properties.put("session.timeout.ms" , "30000" ); properties.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); properties.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer <>(properties); kafkaConsumer.subscribe(Arrays.asList("test" )); while (true ) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100 ); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s" , record.offset(), record.value()); System.out.println(); } } } }